前面我們建立了基礎對話系統、RAG 檢索和即時推播。但目前的 AI 助手還只能「聊天」和「查資料」。這篇要用 Vertex AI Agent Builder + Gemini Function Calling 讓 AI 真正變聰明:會使用工具、能執行複雜任務、可以自主規劃工作流程。
目標:打造一個會寫代碼、會發郵件、會分析數據、會安排會議的超級 AI 助手。
flowchart TB
subgraph "用戶交互層"
USER[用戶請求]
FIREBASE[Firebase 即時推播]
end
subgraph "AI 雙核心架構"
GEMINI[Gemini Function Calling<br/>工具調用決策中心]
DISCOVERY[Discovery Engine<br/>知識檢索引擎]
end
subgraph "工具生態系統"
WORKSPACE[Google Workspace<br/>Gmail, Calendar]
ANALYSIS[數據分析工具<br/>BigQuery, Sheets]
CODE[代碼執行環境<br/>安全沙盒]
EXTERNAL[外部系統<br/>CRM, ERP, APIs]
end
subgraph "原有系統整合"
CHAT[chat-service]
MEMORY[記憶系統]
end
USER --> CHAT
CHAT --> GEMINI
GEMINI --> WORKSPACE
GEMINI --> ANALYSIS
GEMINI --> CODE
GEMINI --> EXTERNAL
GEMINI <--> DISCOVERY
GEMINI <--> MEMORY
GEMINI --> FIREBASE
服務 | 職責 | 技術實現 |
---|---|---|
Gemini Function Calling | 工具調用、任務規劃、邏輯推理 | Vertex AI GenerativeModel + FunctionDeclaration |
Discovery Engine | 知識檢索、文件問答 | 正確的 DataStore 路徑 + ConversationalSearch |
工具執行器 | 實際操作執行 | Cloud Run Jobs + 安全沙盒 |
Firebase 推播 | 即時進度更新 | 與前篇完美整合 |
#!/bin/bash
# scripts/setup-discovery-engine.sh
PROJECT_ID="your-project-id"
LOCATION="global"
DATASTORE_ID="ai-assistant-knowledge"
echo "🔍 建立 Discovery Engine 資料存儲..."
# 1. 啟用 API
gcloud services enable discoveryengine.googleapis.com
# 2. 建立資料存儲(使用 Console 或 API,不是 gcloud 指令)
echo "📝 請前往 Discovery Engine Console 建立資料存儲:"
echo "1. 前往: https://console.cloud.google.com/ai/discovery-engine"
echo "2. 建立新的搜尋應用"
echo "3. 選擇「通用」垂直領域"
echo "4. 資料存儲 ID: $DATASTORE_ID"
echo "5. 內容配置: 需要內容"
echo "6. 啟用「企業搜尋服務層級」"
# 3. 設定服務帳號權限
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:chat-service-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/discoveryengine.viewer"
echo "✅ Discovery Engine 設定完成"
echo "📋 記住資料存儲 ID: $DATASTORE_ID"
# services/chat/app/discovery_client.py
from google.cloud import discoveryengine_v1 as discoveryengine
from typing import Dict, Any, List, Optional
import logging
logger = logging.getLogger(__name__)
class DiscoveryEngineClient:
"""正確的 Discovery Engine 客戶端"""
def __init__(self, project_id: str, location: str = "global", datastore_id: str = "ai-assistant-knowledge"):
self.project_id = project_id
self.location = location
self.datastore_id = datastore_id
# 正確的 DataStore 路徑格式
self.datastore_path = f"projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{datastore_id}"
# 初始化客戶端
self.search_client = discoveryengine.SearchServiceClient()
self.conversation_client = discoveryengine.ConversationalSearchServiceClient()
async def search_documents(self, query: str, page_size: int = 5) -> Dict[str, Any]:
"""企業文件檢索"""
try:
# 正確的 serving_config 路徑
serving_config = f"{self.datastore_path}/servingConfigs/default_config"
request = discoveryengine.SearchRequest(
serving_config=serving_config,
query=query,
page_size=page_size,
query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO
),
spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
)
)
response = self.search_client.search(request)
results = []
for result in response.results:
document = result.document
results.append({
"id": document.id,
"title": document.struct_data.get("title", ""),
"content": self._extract_content(document),
"uri": getattr(document, 'uri', ''),
"score": getattr(result, 'relevance_score', 0.0)
})
return {
"results": results,
"total_results": len(results)
}
except Exception as e:
logger.error(f"Discovery Engine 檢索失敗: {e}")
return {"results": [], "total_results": 0}
async def conversational_search(
self,
query: str,
conversation_id: str,
user_pseudo_id: str
) -> Dict[str, Any]:
"""對話式檢索"""
try:
# 正確的資源路徑
conversation_name = f"{self.datastore_path}/conversations/{conversation_id}"
serving_config = f"{self.datastore_path}/servingConfigs/default_config"
request = discoveryengine.ConverseConversationRequest(
name=conversation_name,
query=discoveryengine.TextInput(input=query),
serving_config=serving_config,
user_pseudo_id=user_pseudo_id
)
response = self.conversation_client.converse_conversation(request)
return {
"conversation_id": response.conversation.name.split('/')[-1],
"reply": response.reply.summary.summary_text if response.reply.summary else "",
"search_results": [
{
"title": result.title,
"content": result.document.struct_data.get("content", ""),
"uri": result.uri
}
for result in response.search_results
]
}
except Exception as e:
logger.error(f"對話式檢索失敗: {e}")
return {
"conversation_id": conversation_id,
"reply": "抱歉,檢索服務暫時無法使用。",
"search_results": []
}
def _extract_content(self, document) -> str:
"""提取文件內容"""
if hasattr(document, 'struct_data') and 'content' in document.struct_data:
return document.struct_data['content']
elif hasattr(document, 'json_data'):
return document.json_data
else:
return str(document)
# services/chat/app/gemini_function_handler.py
import asyncio
import json
import logging
from typing import Dict, Any, List, Optional
from vertexai.generative_models import GenerativeModel, FunctionDeclaration, Tool, Part
import vertexai
from shared.firebase_client import get_firebase_client
from .discovery_client import DiscoveryEngineClient
logger = logging.getLogger(__name__)
class GeminiFunctionHandler:
"""Gemini Function Calling 處理器"""
def __init__(self, project_id: str, location: str = "asia-east1"):
self.project_id = project_id
self.location = location
# 初始化 Vertex AI
vertexai.init(project=project_id, location=location)
# 初始化服務
self.firebase = get_firebase_client()
self.discovery = DiscoveryEngineClient(project_id)
# 定義工具宣告
self.tools = self._create_tools()
# 初始化模型
self.model = GenerativeModel(
"gemini-1.5-pro",
tools=self.tools,
system_instruction=self._get_system_instruction()
)
def _create_tools(self) -> List[Tool]:
"""建立 Function Declarations"""
# 數據分析工具
analyze_data_fn = FunctionDeclaration(
name="analyze_data",
description="分析數據並生成報告",
parameters={
"type": "object",
"properties": {
"data_source": {
"type": "string",
"description": "數據來源(BigQuery 表名、Sheets URL 等)"
},
"analysis_type": {
"type": "string",
"enum": ["descriptive", "predictive", "comparative"],
"description": "分析類型:描述性、預測性、比較性"
},
"output_format": {
"type": "string",
"enum": ["summary", "chart", "table"],
"description": "輸出格式:摘要、圖表、表格"
}
},
"required": ["data_source", "analysis_type"]
}
)
# 郵件發送工具
send_email_fn = FunctionDeclaration(
name="send_email",
description="發送郵件給指定收件人",
parameters={
"type": "object",
"properties": {
"recipients": {
"type": "array",
"items": {"type": "string"},
"description": "收件人郵箱地址列表"
},
"subject": {
"type": "string",
"description": "郵件主題"
},
"body": {
"type": "string",
"description": "郵件內容"
},
"body_type": {
"type": "string",
"enum": ["text", "html"],
"description": "郵件內容格式",
"default": "html"
}
},
"required": ["recipients", "subject", "body"]
}
)
# 會議安排工具
schedule_meeting_fn = FunctionDeclaration(
name="schedule_meeting",
description="安排會議並邀請與會者",
parameters={
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "會議標題"
},
"attendees": {
"type": "array",
"items": {"type": "string"},
"description": "與會者郵箱地址"
},
"start_time": {
"type": "string",
"description": "開始時間(本地時間,格式:YYYY-MM-DDTHH:MM:SS)"
},
"duration_minutes": {
"type": "integer",
"description": "會議時長(分鐘)",
"default": 60
},
"description": {
"type": "string",
"description": "會議描述"
},
"timezone": {
"type": "string",
"description": "時區",
"default": "Asia/Taipei"
}
},
"required": ["title", "attendees", "start_time"]
}
)
# 代碼執行工具
execute_code_fn = FunctionDeclaration(
name="execute_code",
description="在安全環境中執行 Python 代碼",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "要執行的 Python 代碼"
},
"libraries": {
"type": "array",
"items": {
"type": "string",
"enum": ["pandas", "numpy", "matplotlib", "seaborn", "scipy"]
},
"description": "需要的 Python 庫"
},
"description": {
"type": "string",
"description": "代碼功能描述"
}
},
"required": ["code"]
}
)
# 知識檢索工具
search_knowledge_fn = FunctionDeclaration(
name="search_knowledge",
description="搜索企業知識庫",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查詢"
},
"max_results": {
"type": "integer",
"description": "最大結果數",
"default": 5
}
},
"required": ["query"]
}
)
return [Tool(function_declarations=[
analyze_data_fn,
send_email_fn,
schedule_meeting_fn,
execute_code_fn,
search_knowledge_fn
])]
def _get_system_instruction(self) -> str:
"""系統指令"""
return """
你是一個企業級智慧助手,具備以下核心能力:
## 🧠 智慧分析與決策
- 能夠分析複雜任務並分解為可執行的步驟
- 會根據情況選擇最適合的工具組合
- 具備邏輯推理和問題解決能力
## 🛠️ 工具使用原則
1. 執行任何操作前,先確認用戶需求和權限
2. 選擇最有效率的工具組合完成任務
3. 執行前說明計劃,執行中報告進度
4. 出錯時提供替代方案和解決建議
5. 涉及敏感操作時需要明確確認
## 📊 專業能力
- 數據分析:Excel、CSV、BigQuery 數據處理
- 辦公自動化:郵件發送、會議安排、文件管理
- 程式開發:Python 代碼編寫和執行
- 知識管理:企業文件檢索和問答
## 🔒 安全與合規
- 嚴格遵守數據保護和隱私規定
- 不處理敏感個資或機密信息
- 執行重要操作前需要用戶確認
- 記錄所有操作日誌用於審計
請始終保持專業、準確、有用的回應風格。
""".strip()
async def process_with_tools(
self,
user_message: str,
chat_id: str,
user_id: str,
conversation_history: List[Dict] = None
) -> Dict[str, Any]:
"""處理帶工具調用的對話"""
try:
# 更新進度
await self.firebase.update_chat_progress(
chat_id, 'gemini_thinking', 10, 'processing', 'AI 正在分析任務...'
)
# 建立對話歷史
conversation = []
if conversation_history:
for msg in conversation_history[-5:]: # 只保留最近 5 輪對話
conversation.append({
"role": "user" if msg["role"] == "user" else "model",
"parts": [msg["content"]]
})
# 添加當前用戶訊息
conversation.append({
"role": "user",
"parts": [user_message]
})
# 第一次調用 Gemini
response = self.model.generate_content(conversation)
await self.firebase.update_chat_progress(
chat_id, 'analyzing_request', 20, 'processing', '正在分析是否需要使用工具...'
)
# 檢查是否有 function calls
if self._has_function_calls(response):
return await self._handle_function_calls(
response, conversation, chat_id, user_id
)
else:
# 沒有工具調用,直接回應
await self.firebase.complete_chat(
chat_id,
response.text,
processing_time_ms=0
)
return {
"message": response.text,
"has_tool_calls": False,
"is_complete": True
}
except Exception as e:
logger.error(f"Gemini 處理失敗: {e}")
await self.firebase.error_chat(chat_id, str(e))
raise
def _has_function_calls(self, response) -> bool:
"""檢查回應是否包含 function calls"""
try:
if not response.candidates:
return False
candidate = response.candidates[0]
if not candidate.content or not candidate.content.parts:
return False
for part in candidate.content.parts:
if hasattr(part, 'function_call') and part.function_call:
return True
return False
except:
return False
async def _handle_function_calls(
self,
response,
conversation: List[Dict],
chat_id: str,
user_id: str
) -> Dict[str, Any]:
"""處理 function calls"""
# 提取所有 function calls
function_calls = []
for candidate in response.candidates:
if not candidate.content or not candidate.content.parts:
continue
for part in candidate.content.parts:
if hasattr(part, 'function_call') and part.function_call:
fc = part.function_call
function_calls.append({
"name": fc.name,
"args": dict(fc.args) if fc.args else {}
})
if not function_calls:
return {
"message": response.text or "我無法處理這個請求。",
"has_tool_calls": False,
"is_complete": True
}
await self.firebase.update_chat_progress(
chat_id, 'executing_tools', 40, 'processing',
f'正在執行 {len(function_calls)} 個工具...'
)
# 執行工具並收集結果
tool_results = []
for i, func_call in enumerate(function_calls):
progress = 40 + int((i / len(function_calls)) * 40)
await self.firebase.update_chat_progress(
chat_id, 'executing_tools', progress, 'processing',
f'正在執行: {func_call["name"]}'
)
# 執行工具
if func_call["name"] == "search_knowledge":
# 使用 Discovery Engine
result = await self._handle_knowledge_search(func_call["args"])
else:
# 其他工具使用工具執行器
result = await self._execute_tool(
func_call["name"],
func_call["args"],
{"chat_id": chat_id, "user_id": user_id}
)
tool_results.append({
"name": func_call["name"],
"result": result
})
# 準備 function responses
function_responses = []
for tool_result in tool_results:
function_responses.append(
Part.from_function_response(
name=tool_result["name"],
response=tool_result["result"]
)
)
# 添加到對話歷史
conversation.append({
"role": "model",
"parts": [part for part in response.candidates[0].content.parts]
})
conversation.append({
"role": "function",
"parts": function_responses
})
await self.firebase.update_chat_progress(
chat_id, 'generating_response', 90, 'processing',
'正在整合結果並生成最終回應...'
)
# 生成最終回應
final_response = self.model.generate_content(conversation)
await self.firebase.complete_chat(
chat_id,
final_response.text,
processing_time_ms=0
)
return {
"message": final_response.text,
"has_tool_calls": True,
"tool_results": tool_results,
"is_complete": True
}
async def _handle_knowledge_search(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""處理知識檢索"""
try:
query = args.get("query", "")
max_results = args.get("max_results", 5)
results = await self.discovery.search_documents(query, max_results)
return {
"success": True,
"results": results["results"],
"total_found": results["total_results"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def _execute_tool(
self,
tool_name: str,
parameters: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]:
"""執行工具(調用工具執行器服務)"""
# 這裡會調用實際的工具執行器
# 為簡化示例,直接返回模擬結果
return {
"success": True,
"message": f"工具 {tool_name} 執行完成",
"parameters": parameters
}
# shared/workspace_tools.py
import asyncio
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pytz
from google.oauth2 import service_account
from googleapiclient.discovery import build
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import base64
logger = logging.getLogger(__name__)
class WorkspaceTools:
"""Google Workspace 工具(使用 Domain-wide Delegation)"""
def __init__(self, service_account_file: str, admin_email: str):
self.service_account_file = service_account_file
self.admin_email = admin_email
# Gmail 和 Calendar 的 OAuth scopes
self.scopes = [
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/calendar'
]
def _get_delegated_credentials(self, user_email: str):
"""獲取代理憑證"""
credentials = service_account.Credentials.from_service_account_file(
self.service_account_file,
scopes=self.scopes
)
# 代理特定用戶
delegated_credentials = credentials.with_subject(user_email)
return delegated_credentials
async def send_email(
self,
sender_email: str,
recipients: List[str],
subject: str,
body: str,
body_type: str = "html"
) -> Dict[str, Any]:
"""發送郵件"""
try:
# 獲取代理憑證
credentials = self._get_delegated_credentials(sender_email)
# 建立 Gmail 服務
service = build('gmail', 'v1', credentials=credentials)
# 建立郵件
message = MIMEMultipart()
message['to'] = ', '.join(recipients)
message['subject'] = subject
# 添加郵件內容
msg_body = MIMEText(body, body_type, 'utf-8')
message.attach(msg_body)
# 編碼郵件
raw_message = base64.urlsafe_b64encode(
message.as_bytes()
).decode('utf-8')
# 發送郵件
send_result = service.users().messages().send(
userId='me',
body={'raw': raw_message}
).execute()
return {
"success": True,
"message_id": send_result.get('id'),
"recipients": recipients,
"subject": subject
}
except Exception as e:
logger.error(f"發送郵件失敗: {e}")
return {
"success": False,
"error": str(e)
}
async def schedule_meeting(
self,
organizer_email: str,
title: str,
attendees: List[str],
start_time: str,
duration_minutes: int = 60,
timezone: str = "Asia/Taipei",
description: str = ""
) -> Dict[str, Any]:
"""安排會議"""
try:
# 獲取代理憑證
credentials = self._get_delegated_credentials(organizer_email)
# 建立 Calendar 服務
service = build('calendar', 'v3', credentials=credentials)
# 處理時間(統一使用本地時間 + timeZone)
tz = pytz.timezone(timezone)
# 解析開始時間
start_dt = datetime.fromisoformat(start_time)
if start_dt.tzinfo is None:
start_dt = tz.localize(start_dt)
# 計算結束時間
end_dt = start_dt + timedelta(minutes=duration_minutes)
# 建立會議事件(使用正確的時區格式)
event = {
'summary': title,
'description': description,
'start': {
'dateTime': start_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'timeZone': timezone
},
'end': {
'dateTime': end_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'timeZone': timezone
},
'attendees': [
{'email': email, 'responseStatus': 'needsAction'}
for email in attendees
],
'reminders': {
'useDefault': False,
'overrides': [
{'method': 'email', 'minutes': 24 * 60},
{'method': 'popup', 'minutes': 10}
]
},
'conferenceData': {
'createRequest': {
'requestId': f"meet-{int(datetime.now().timestamp())}",
'conferenceSolutionKey': {'type': 'hangoutsMeet'}
}
}
}
# 建立會議
created_event = service.events().insert(
calendarId='primary',
body=event,
conferenceDataVersion=1
).execute()
return {
"success": True,
"event_id": created_event.get('id'),
"title": title,
"start_time": start_dt.isoformat(),
"end_time": end_dt.isoformat(),
"attendees": attendees,
"meet_link": created_event.get('hangoutLink', ''),
"calendar_link": created_event.get('htmlLink', '')
}
except Exception as e:
logger.error(f"安排會議失敗: {e}")
return {
"success": False,
"error": str(e)
}
async def check_calendar_conflicts(
self,
user_email: str,
start_time: str,
end_time: str,
timezone: str = "Asia/Taipei"
) -> Dict[str, Any]:
"""檢查日曆衝突"""
try:
credentials = self._get_delegated_credentials(user_email)
service = build('calendar', 'v3', credentials=credentials)
# 查詢忙碌時間
body = {
"timeMin": start_time,
"timeMax": end_time,
"timeZone": timezone,
"items": [{"id": user_email}]
}
busy_result = service.freebusy().query(body=body).execute()
busy_times = busy_result.get('calendars', {}).get(user_email, {}).get('busy', [])
return {
"success": True,
"has_conflicts": len(busy_times) > 0,
"conflicts": busy_times
}
except Exception as e:
logger.error(f"檢查日曆衝突失敗: {e}")
return {
"success": False,
"error": str(e)
}
#!/bin/bash
# scripts/setup-workspace-integration.sh
PROJECT_ID="your-project-id"
SERVICE_ACCOUNT_NAME="workspace-integration-sa"
SERVICE_ACCOUNT_EMAIL="${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
echo "🔧 設定 Google Workspace 整合..."
# 1. 建立服務帳號
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME \
--display-name="Workspace Integration Service Account" \
--description="用於 Gmail 和 Calendar API 的服務帳號"
# 2. 生成金鑰檔案
gcloud iam service-accounts keys create workspace-service-account.json \
--iam-account=$SERVICE_ACCOUNT_EMAIL
# 3. 啟用必要的 API
gcloud services enable gmail.googleapis.com
gcloud services enable calendar-json.googleapis.com
# 4. 儲存金鑰到 Secret Manager
gcloud secrets create workspace-sa-key --data-file=workspace-service-account.json
echo "✅ 服務帳號建立完成"
echo ""
echo "📋 接下來請在 Google Workspace Admin Console 完成 Domain-wide Delegation 設定:"
echo ""
echo "1. 前往 Admin Console: https://admin.google.com"
echo "2. 進入「安全性」>「API 控制項」>「管理網域級委派」"
echo "3. 點擊「新增」並填入以下資訊:"
echo ""
echo " 客戶端 ID:"
gcloud iam service-accounts describe $SERVICE_ACCOUNT_EMAIL --format='value(uniqueId)'
echo ""
echo " OAuth 範圍(複製整行):"
echo " https://www.googleapis.com/auth/gmail.send,https://www.googleapis.com/auth/calendar"
echo ""
echo "4. 儲存設定並等待生效(最多 24 小時)"
echo ""
echo "🔑 服務帳號金鑰已儲存到 Secret Manager: workspace-sa-key"
echo "⚠️ 原始檔案 workspace-service-account.json 請妥善保管或刪除"
# shared/code_executor.py
import asyncio
import logging
import tempfile
import subprocess
import os
import time
from typing import Dict, Any, List
import docker
import json
logger = logging.getLogger(__name__)
class SecureCodeExecutor:
"""安全的代碼執行器(使用容器隔離)"""
def __init__(self):
self.docker_client = docker.from_env()
self.max_execution_time = 30 # 秒
self.max_memory = "128m"
self.allowed_libraries = {
'pandas', 'numpy', 'matplotlib', 'seaborn',
'scipy', 'scikit-learn', 'plotly'
}
async def execute_code(
self,
code: str,
libraries: List[str] = None,
description: str = ""
) -> Dict[str, Any]:
"""在安全容器中執行代碼"""
start_time = time.time()
try:
# 1. 安全性檢查
if not self._is_safe_code(code):
return {
"success": False,
"error": "代碼包含不安全的操作"
}
# 2. 驗證庫依賴
libraries = libraries or []
invalid_libs = set(libraries) - self.allowed_libraries
if invalid_libs:
return {
"success": False,
"error": f"不允許的庫: {', '.join(invalid_libs)}"
}
# 3. 在容器中執行
result = await self._execute_in_container(code, libraries)
execution_time = time.time() - start_time
return {
"success": True,
"output": result["output"],
"execution_time": round(execution_time, 3),
"container_stats": result.get("stats", {})
}
except Exception as e:
logger.error(f"代碼執行失敗: {e}")
return {
"success": False,
"error": str(e)
}
def _is_safe_code(self, code: str) -> bool:
"""多層安全檢查"""
# 危險操作黑名單
dangerous_patterns = [
# 系統操作
'import os', 'import sys', 'import subprocess',
'os.', 'sys.', 'subprocess.',
# 檔案操作
'open(', 'file(', 'input(', 'eval(', 'exec(',
# 網路操作
'import socket', 'import urllib', 'import requests',
'socket.', 'urllib.', 'requests.',
# 危險模組
'import shutil', 'import glob', 'shutil.', 'glob.',
'__import__', 'getattr', 'setattr', 'delattr',
# 系統呼叫
'system(', 'popen(', 'spawn'
]
code_lower = code.lower()
# 檢查危險模式
for pattern in dangerous_patterns:
if pattern in code_lower:
logger.warning(f"發現危險模式: {pattern}")
return False
# 檢查代碼長度
if len(code) > 10000: # 10KB 限制
logger.warning("代碼過長")
return False
return True
async def _execute_in_container(self, code: str, libraries: List[str]) -> Dict[str, Any]:
"""在 Docker 容器中執行代碼"""
# 建立執行腳本
script_content = self._build_execution_script(code, libraries)
try:
# 執行容器
container = self.docker_client.containers.run(
image="python:3.11-slim",
command=["python", "-c", script_content],
mem_limit=self.max_memory,
network_disabled=True, # 禁用網路
read_only=True, # 唯讀檔案系統
remove=True, # 自動清理
stdout=True,
stderr=True,
detach=False,
timeout=self.max_execution_time
)
# 解碼輸出
output = container.decode('utf-8')
return {
"output": output,
"stats": {"container_used": True}
}
except docker.errors.ContainerError as e:
# 容器執行錯誤
return {
"output": f"執行錯誤: {e.stderr.decode('utf-8') if e.stderr else str(e)}",
"stats": {"error": "container_error"}
}
except Exception as e:
# 其他錯誤
return {
"output": f"系統錯誤: {str(e)}",
"stats": {"error": "system_error"}
}
def _build_execution_script(self, code: str, libraries: List[str]) -> str:
"""建立執行腳本"""
# 導入允許的庫
imports = []
for lib in libraries:
if lib in self.allowed_libraries:
imports.append(f"import {lib}")
# 組合完整腳本
script = f"""
import sys
import io
from contextlib import redirect_stdout, redirect_stderr
# 導入允許的庫
{chr(10).join(imports)}
# 捕獲輸出
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
try:
with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer):
# 用戶代碼
{chr(10).join(' ' + line for line in code.split(chr(10)))}
output = stdout_buffer.getvalue()
errors = stderr_buffer.getvalue()
if errors:
print(f"警告: {{errors}}")
if output:
print(output)
else:
print("代碼執行完成,無輸出內容")
except Exception as e:
print(f"執行錯誤: {{type(e).__name__}}: {{str(e)}}")
"""
return script.strip()
# services/chat/app/enhanced_chat_handler.py
import asyncio
import uuid
import time
import logging
from typing import Dict, Any
from shared.firebase_client import get_firebase_client
from .gemini_function_handler import GeminiFunctionHandler
from .discovery_client import DiscoveryEngineClient
from .models import ChatRequest, ChatResponse, ProcessingMode
logger = logging.getLogger(__name__)
class EnhancedChatHandler:
"""增強版聊天處理器(整合 Gemini + Discovery Engine)"""
def __init__(self, project_id: str):
self.project_id = project_id
# 初始化服務
self.firebase = get_firebase_client()
self.gemini_handler = GeminiFunctionHandler(project_id)
self.discovery = DiscoveryEngineClient(project_id)
async def process_enhanced_chat(self, request: ChatRequest) -> ChatResponse:
"""處理增強版對話"""
chat_id = request.chat_id or str(uuid.uuid4())
start_time = time.time()
try:
# 1. 建立 Firebase 會話
await self.firebase.create_chat_session(
chat_id,
request.user_id,
request.message
)
# 2. 智慧判斷處理模式
processing_mode = await self._determine_processing_mode(request.message)
if processing_mode == ProcessingMode.SYNC:
return await self._handle_sync_enhanced(request, chat_id, start_time)
else:
return await self._handle_async_enhanced(request, chat_id, start_time)
except Exception as e:
logger.error(f"增強對話處理失敗: {e}")
await self.firebase.error_chat(chat_id, str(e))
raise
async def _determine_processing_mode(self, message: str) -> ProcessingMode:
"""智慧判斷處理模式"""
# 檢查是否需要複雜工具調用
complex_keywords = [
'分析數據', '發送郵件', '安排會議', '執行代碼',
'生成報告', '處理文件', '計算', '統計'
]
# 檢查是否為簡單查詢
simple_keywords = ['什麼是', '如何', '為什麼', '解釋']
message_lower = message.lower()
# 複雜任務 -> 非同步處理
if any(keyword in message for keyword in complex_keywords):
return ProcessingMode.ASYNC
# 簡單查詢 -> 同步處理
if any(keyword in message for keyword in simple_keywords):
return ProcessingMode.SYNC
# 預設混合模式
return ProcessingMode.HYBRID
async def _handle_sync_enhanced(
self,
request: ChatRequest,
chat_id: str,
start_time: float
) -> ChatResponse:
"""同步處理(簡單查詢)"""
try:
# 優先使用 Discovery Engine 進行知識檢索
search_result = await self.discovery.conversational_search(
request.message,
chat_id,
request.user_id
)
if search_result["reply"]:
# Discovery Engine 有好的回答
response_text = search_result["reply"]
# 添加來源資訊
if search_result["search_results"]:
sources = []
for result in search_result["search_results"][:2]:
sources.append(result["title"])
if sources:
response_text += f"\n\n📚 參考來源: {', '.join(sources)}"
else:
# 降級到 Gemini 回答
gemini_result = await self.gemini_handler.process_with_tools(
request.message,
chat_id,
request.user_id
)
response_text = gemini_result["message"]
# 完成處理
processing_time = int((time.time() - start_time) * 1000)
await self.firebase.complete_chat(
chat_id,
response_text,
processing_time
)
return ChatResponse(
message=response_text,
chat_id=chat_id,
processing_mode=ProcessingMode.SYNC,
is_complete=True,
requires_followup=False
)
except Exception as e:
logger.error(f"同步處理失敗: {e}")
await self.firebase.error_chat(chat_id, str(e))
raise
async def _handle_async_enhanced(
self,
request: ChatRequest,
chat_id: str,
start_time: float
) -> ChatResponse:
"""非同步處理(複雜任務)"""
try:
# 立即給用戶 ACK
quick_response = "我正在分析您的需求並準備執行相關工具,請稍等片刻..."
# 啟動背景處理
asyncio.create_task(
self._background_enhanced_processing(request, chat_id)
)
return ChatResponse(
message=quick_response,
chat_id=chat_id,
processing_mode=ProcessingMode.ASYNC,
is_complete=False,
requires_followup=True,
metadata={"estimated_time": "30-60秒"}
)
except Exception as e:
logger.error(f"非同步處理失敗: {e}")
await self.firebase.error_chat(chat_id, str(e))
raise
async def _background_enhanced_processing(
self,
request: ChatRequest,
chat_id: str
):
"""背景增強處理"""
try:
# 載入對話歷史
conversation_history = await self._load_conversation_history(
request.user_id,
chat_id
)
# 使用 Gemini Function Calling 處理
result = await self.gemini_handler.process_with_tools(
request.message,
chat_id,
request.user_id,
conversation_history
)
logger.info(f"背景處理完成: {chat_id}")
except Exception as e:
logger.error(f"背景處理失敗: {e}")
await self.firebase.error_chat(chat_id, str(e))
async def _load_conversation_history(
self,
user_id: str,
chat_id: str
) -> List[Dict[str, Any]]:
"""載入對話歷史"""
try:
# 從 Firebase 載入最近對話
# 這裡簡化處理,實際可以從 memory-service 載入
return []
except Exception as e:
logger.error(f"載入對話歷史失敗: {e}")
return []
# cloudbuild-enhanced.yaml
steps:
# 建置增強版 chat-service
- name: 'gcr.io/cloud-builders/docker'
args: [
'build',
'-t', '${_REGION}-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service-enhanced:$SHORT_SHA',
'-f', 'services/chat/Dockerfile.enhanced',
'.'
]
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service-enhanced:$SHORT_SHA']
# 部署到 Cloud Run(Gen2 + 安全配置)
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: 'gcloud'
args:
- 'run'
- 'deploy'
- 'chat-service-enhanced'
- '--image=${_REGION}-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service-enhanced:$SHORT_SHA'
- '--region=${_REGION}'
- '--platform=managed'
- '--execution-environment=gen2'
- '--service-account=chat-service-sa@$PROJECT_ID.iam.gserviceaccount.com'
- '--set-secrets=WORKSPACE_SA_KEY=workspace-sa-key:latest'
- '--set-env-vars=GCP_PROJECT_ID=$PROJECT_ID,VERTEX_LOCATION=${_REGION},DISCOVERY_DATASTORE_ID=ai-assistant-knowledge'
- '--min-instances=2'
- '--max-instances=20'
- '--cpu=2'
- '--memory=4Gi'
- '--concurrency=100'
- '--timeout=540s'
- '--no-allow-unauthenticated'
- '--ingress=internal-and-cloud-load-balancing'
substitutions:
_REGION: 'asia-east1'
options:
logging: CLOUD_LOGGING_ONLY
machineType: 'E2_HIGHCPU_8'
# services/chat/Dockerfile.enhanced
FROM python:3.11-slim
WORKDIR /app
# 安裝系統依賴
RUN apt-get update && apt-get install -y \
gcc \
g++ \
curl \
&& rm -rf /var/lib/apt/lists/*
# 安裝 Docker(用於代碼執行)
RUN curl -fsSL https://download.docker.com/linux/debian/gpg | apt-key add - \
&& echo "deb [arch=amd64] https://download.docker.com/linux/debian bullseye stable" | tee /etc/apt/sources.list.d/docker.list \
&& apt-get update \
&& apt-get install -y docker-ce-cli \
&& rm -rf /var/lib/apt/lists/*
# 複製需求檔案
COPY services/chat/requirements-enhanced.txt .
RUN pip install --no-cache-dir -r requirements-enhanced.txt
# 複製應用程式
COPY services/chat/app ./app
COPY shared ./shared
# 設定環境變數
ENV PYTHONPATH=/app
ENV PORT=8080
EXPOSE 8080
CMD ["python", "-m", "uvicorn", "app.main_enhanced:app", "--host", "0.0.0.0", "--port", "8080"]
# services/chat/requirements-enhanced.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0
# Google Cloud 服務
google-cloud-discoveryengine==0.11.0
google-cloud-aiplatform==1.38.0
google-cloud-firestore==2.13.1
google-api-python-client==2.108.0
google-auth==2.23.4
# Vertex AI
google-cloud-aiplatform[langchain]==1.38.0
# 工具執行
docker==6.1.3
pandas==2.1.3
numpy==1.25.2
matplotlib==3.8.1
# 其他依賴
httpx==0.25.2
pytz==2023.3
python-multipart==0.0.6
aiofiles==23.2.1
#!/bin/bash
# scripts/test-enhanced-system.sh
PROJECT_ID="your-project-id"
REGION="asia-east1"
API_BASE="https://chat-service-enhanced-xxx.run.app"
echo "🧪 測試增強版 AI 助手系統..."
# 取得認證 token
TOKEN=$(gcloud auth print-identity-token)
# 測試函數定義
test_scenario() {
local test_name="$1"
local user_message="$2"
local expected_tools="$3"
echo "🔍 測試場景: $test_name"
echo "📝 用戶訊息: $user_message"
local chat_id="test-$(date +%s)-$(shuf -i 1000-9999 -n 1)"
# 發送請求
local response=$(curl -s -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-X POST "$API_BASE/enhanced/chat" \
-d "{
\"message\": \"$user_message\",
\"user_id\": \"test-user-enhanced\",
\"chat_id\": \"$chat_id\"
}")
echo "📨 回應: $response"
echo "🔗 Firebase 監控: https://console.firebase.google.com/project/$PROJECT_ID/firestore/data/~2Fchats~2F$chat_id"
echo "---"
# 等待處理
sleep 5
}
# 執行測試場景
echo "🚀 開始測試..."
# 測試 1: 知識檢索
test_scenario \
"知識檢索" \
"什麼是人工智慧?請詳細解釋" \
"search_knowledge"
# 測試 2: 數據分析
test_scenario \
"數據分析" \
"分析銷售數據並生成圖表報告" \
"analyze_data,execute_code"
# 測試 3: 郵件發送
test_scenario \
"郵件自動化" \
"發送項目進度報告給團隊成員" \
"send_email"
# 測試 4: 會議安排
test_scenario \
"會議管理" \
"安排明天下午2點的產品檢討會議" \
"schedule_meeting"
# 測試 5: 複合任務
test_scenario \
"複合工作流程" \
"分析本月業績數據,生成報告並發給管理層,同時安排檢討會議" \
"analyze_data,execute_code,send_email,schedule_meeting"
# 測試 6: 代碼執行
test_scenario \
"代碼執行" \
"用 Python 計算斐波那契數列前20項並繪製圖表" \
"execute_code"
echo "✅ 所有測試場景已發送"
echo "📊 請檢查 Firebase Console 查看即時處理進度"
echo "📈 請檢查 Cloud Logging 查看詳細執行日誌"
# 檢查系統健康度
echo "🔍 檢查系統健康度..."
curl -s -H "Authorization: Bearer $TOKEN" "$API_BASE/health" | jq '.'
echo "🎉 測試完成!"
{
"displayName": "AI 助手增強版監控",
"widgets": [
{
"title": "Function Calling 成功率",
"xyChart": {
"dataSets": [{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "resource.type=\"cloud_run_revision\" AND resource.labels.service_name=\"chat-service-enhanced\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_RATE",
"crossSeriesReducer": "REDUCE_MEAN"
}
}
}
}]
}
},
{
"title": "工具執行延遲",
"xyChart": {
"dataSets": [{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/tool_execution_time\"",
"aggregation": {
"alignmentPeriod": "300s",
"perSeriesAligner": "ALIGN_DELTA",
"crossSeriesReducer": "REDUCE_PERCENTILE_95"
}
}
}
}]
}
},
{
"title": "用戶滿意度",
"scorecard": {
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/user_satisfaction\"",
"aggregation": {
"alignmentPeriod": "3600s",
"crossSeriesReducer": "REDUCE_MEAN"
}
}
}
}
}
]
}